#include <unistd.h>
#include <string.h>
#include <vector>
#include <functional>
#include <fcntl.h>

#include "iomanager.h"
#include "log.h"
#include "macro.h"

namespace qtch{

static Logger::ptr logger = QTCH_LOG_NAME("system");

IOManager::IOManager(size_t threads, bool use_caller, const std::string & name)
    :Scheduler(threads,use_caller,name){
    m_epfd = epoll_create(5000);
    QTCH_ASSERT(m_epfd > 0);

    int rt = pipe(m_tickleFds);
    QTCH_ASSERT(!rt);
    epoll_event event;
    memset(&event,0,sizeof(epoll_event));
    event.data.fd = m_tickleFds[0];
    event.events = EPOLLIN | EPOLLET;
    rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
    QTCH_ASSERT(!rt);
    rt = epoll_ctl(m_epfd,EPOLL_CTL_ADD,m_tickleFds[0],&event);
    QTCH_ASSERT(!rt);

    contextResize(32);
    QTCH_LOG_DEBUG(logger) << "------------------------";
}

IOManager::~IOManager(){
    QTCH_LOG_DEBUG(logger) << "~IOManager name=" << getName();
    stop();
    close(m_epfd);
    close(m_tickleFds[0]);
    close(m_tickleFds[1]);
    for(size_t i = 0;i<m_fdContexts.size(); ++i){
        if(m_fdContexts[i]){
            delete m_fdContexts[i];
        }
    }
}

int IOManager::addEvent(int fd, Event event, std::function<void()> cb){
    QTCH_ASSERT(event == Event::READ || event == Event::WRITE);
    FdContext* fd_ctx = nullptr;
    
    RWMutexType::ReadLock lock1(m_mutex);
    if((int)m_fdContexts.size()>fd){
        fd_ctx = m_fdContexts[fd];
        lock1.unlock();
    }
    else{
        lock1.unlock();
        RWMutexType::WriteLock lock2(m_mutex);
        contextResize(fd * 1.5);
        fd_ctx = m_fdContexts[fd];
    }
    FdContext::MutexType::Lock lock3(fd_ctx->mutex);
    if(fd_ctx->events & event){
        QTCH_LOG_ERROR(logger) << "addEvent assert fd=" << fd
                        << " event=" << event
                        << " fd_ctx.event=" << fd_ctx->events;
        QTCH_ASSERT(!(fd_ctx->events & event));
    }
    int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
    epoll_event epevent;
    memset(&epevent,0,sizeof(epoll_event));
    epevent.data.fd = fd;
    epevent.data.ptr = fd_ctx;
    epevent.events = EPOLLET | fd_ctx->events | event;
    int rt = epoll_ctl(m_epfd, op, fd, &epevent);
    if(rt){
        QTCH_LOG_ERROR(logger) << "epoll_ctl("<<m_epfd<<", "
            << op << ", " << fd << ", " <<epevent.events << "):"
            << rt << " (" << errno << ") (" << strerror(errno) <<") fd_ctx->event="
            << fd_ctx->events;
        return -1;
    }
    ++m_pendingEventCount;
    fd_ctx->events = (Event)(fd_ctx->events | event);
    FdContext::EventContext eventContext = fd_ctx->getContext(event);
    eventContext.scheduler = Scheduler::getThis();
    if(cb){
        eventContext.cb.swap(cb);
    }
    else{
        eventContext.fiber = Fiber::GetThis();
        QTCH_ASSERT2(eventContext.fiber->getState() == Fiber::EXEC
                        ,"state = " << eventContext.fiber->getState());
    }
    if(event & IOManager::Event::READ){
        fd_ctx->read = eventContext;
    }
    else{
        fd_ctx->write = eventContext;
    }
    return 0;
    
}

bool IOManager::_delEvent(int fd,Event event,bool run){
    QTCH_ASSERT(event == Event::READ || event == Event::WRITE);
    RWMutexType::ReadLock lock1(m_mutex);
    if((int)m_fdContexts.size()<=fd){
        QTCH_LOG_DEBUG(logger) << "m_fdContexts.size:" <<(int)m_fdContexts.size();
        return false;
    }
    FdContext* fd_ctx = m_fdContexts[fd];
    lock1.unlock();

    FdContext::MutexType::Lock lock2(fd_ctx->mutex);
    if(!(fd_ctx->events & event)){
        return false;
    }
    Event new_event = (Event)(fd_ctx->events & ~event);
    int op = new_event ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
    epoll_event epevent;
    memset(&epevent,0,sizeof(epoll_event));
    epevent.data.fd = fd;
    epevent.data.ptr = fd_ctx;
    epevent.events = EPOLLET | new_event;
    int rt = epoll_ctl(m_epfd, op, fd, &epevent);
    if(rt){
        QTCH_LOG_ERROR(logger) << "epoll_ctl("<<m_epfd<<", "
            << op << ", " << fd << ", " <<epevent.events << "):"
            << rt << " (" << errno << ") (" << strerror(errno) <<") fd_ctx->event="
            << fd_ctx->events;
        return -1;
    }
    if(run){
        fd_ctx->triggerEvent(event);
    }
    else{
        fd_ctx->resetContext(event);
    }
    --m_pendingEventCount;
    return true;
}

bool IOManager::delEvent(int fd, Event event){
    QTCH_ASSERT(event == Event::READ || event == Event::WRITE);
    return _delEvent(fd,event,false);
}

bool IOManager::cancelEvent(int fd, Event event){
    QTCH_ASSERT(event == Event::READ || event == Event::WRITE);
    return _delEvent(fd,event,true);
}

void IOManager::tickle(){
    if(!hasIdleThreads()){
        return;
    }
    int rt = write(m_tickleFds[1],"T",1);
    QTCH_ASSERT(rt == 1);
}

void IOManager::idle(){
    QTCH_LOG_DEBUG(logger) <<"begin idle";
    const uint64_t MAX_EVENTS = 256;
    epoll_event * events = new epoll_event[MAX_EVENTS]();
    std::shared_ptr<epoll_event> shared_events(events,[](epoll_event * ptr){
        delete[] ptr;
    });
    while(true){
        uint64_t next_timeout = 0;
        if(stopping(next_timeout)){
            QTCH_LOG_DEBUG(logger) << "name="<< getName()
                                << " idle stopping exit";
            break;
        }
        int rt = 0;
        do{
            static const int MAX_TIMEOUT = 3000;
            if(next_timeout != ~0ull){
                next_timeout = (int)next_timeout > MAX_TIMEOUT ? MAX_TIMEOUT : next_timeout;
            }
            else{
                next_timeout = MAX_TIMEOUT;
            }
            rt = epoll_wait(m_epfd,events,MAX_EVENTS,next_timeout);
            if( rt < 0 && errno == EINTR){
            } else {
                break;
            }
        }while(true);

        // 调度定时器设定的任务
        std::vector<std::function<void()> >cbs;
        listExpectTimer(cbs);
        if(!cbs.empty()){
            schedule(cbs.begin(),cbs.end());
            cbs.clear();
        }
        for(int i = 0; i < rt ; ++i){
            epoll_event & event = events[i];
            if(event.data.fd == m_tickleFds[0]){
                uint8_t dummy[256];
                while(read(m_tickleFds[0],dummy,sizeof(dummy)) > 0);
                continue;
            }
            FdContext* fd_ctx = (FdContext*)event.data.ptr;
            FdContext::MutexType::Lock lock(fd_ctx->mutex);
            if(event.events & (EPOLLERR | EPOLLHUP)){
                event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
            }
            int real_event = Event::None;
            if(event.events & EPOLLIN){
                real_event |= Event::READ;
            }
            if(event.events & EPOLLOUT){
                real_event |= Event::WRITE;
            }
            if((fd_ctx->events & real_event) == Event::None){
                continue;
            }
            int left_event = (fd_ctx->events & ~real_event);
            int op = left_event ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
            event.events = EPOLLET | left_event;
            int rt2 = epoll_ctl(m_epfd,op, fd_ctx->fd,&event);
            if(rt2){
                QTCH_LOG_ERROR(logger) << "epoll_ctl("<<m_epfd<<", "
                    << op << ", " << fd_ctx->fd << ", " <<event.events << "):"
                    << rt << " (" << errno << ") (" << strerror(errno) <<") fd_ctx->event="
                    << fd_ctx->events;
                continue;
            }
            if(real_event & Event::READ){
                fd_ctx->triggerEvent(Event::READ);
                --m_pendingEventCount;
            }
            if(real_event & Event::WRITE){
                fd_ctx->triggerEvent(Event::WRITE);
                --m_pendingEventCount;
            }
        }
        Fiber::yieldToReady();
    }
}

bool IOManager::stopping(){
    return m_pendingEventCount==0
        && Scheduler::stopping();
}

bool IOManager::stopping(uint64_t& next_timeout){
    next_timeout = getNextTime();
    return next_timeout==~0ull
        && stopping();
}

void IOManager::contextResize(size_t size){
    m_fdContexts.resize(size);
    for(size_t i=0;i<size; ++i){
        if(!m_fdContexts[i]){
            m_fdContexts[i] = new FdContext();
            m_fdContexts[i]->fd = i;
        }
    }
}

IOManager::FdContext::EventContext& IOManager::FdContext::getContext(Event event){
    switch(event){
    case IOManager::READ:
        return read;
    case IOManager::WRITE:
        return write;
    default:
        QTCH_ASSERT2(false,"IOManager::FdContext::getContext");
    }
    throw std::invalid_argument("getContext event invalid");
}

void  IOManager::FdContext::resetContext(Event event){
    QTCH_ASSERT(events & event);
    EventContext eventContext = getContext(event);
    events = (Event)(events & ~event);
    eventContext.cb = nullptr;
    eventContext.fiber.reset();
    eventContext.scheduler = nullptr;
}

void  IOManager::FdContext::triggerEvent(Event event){
    QTCH_ASSERT(event == Event::READ || event == Event::WRITE);
    if(!(event & events)){
        return;
    }
    FdContext::EventContext event_ctx = getContext(event);
    if(event_ctx.cb){
        event_ctx.scheduler->schedule(event_ctx.cb);
    }
    else{
        event_ctx.scheduler->schedule(event_ctx.fiber);
    }
    resetContext(event);
    return;
    
}

bool IOManager::delEventAll(int fd){
    int rt = _delEvent(fd,READ,false);
    rt |= _delEvent(fd,WRITE,false);
    return rt;
}

bool IOManager::cancelEventAll(int fd){
    int rt = _delEvent(fd,READ,true);
    rt |= _delEvent(fd,WRITE,true);
    return rt;
}

void IOManager::onTimerInsertedAtFront(){
    tickle();
}

IOManager* IOManager::getThis(){
    return dynamic_cast<IOManager*>(Scheduler::getThis());
}


}